In [None]:
# Code source: Sebastian Curi and Andreas Krause, based on Jaques Grobler (sklearn demos).
# License: BSD 3 clause

# We start importing some modules and running some magic commands
%matplotlib inline
%reload_ext autoreload
%load_ext autoreload
%autoreload 2

# General math and plotting modules.
import numpy as np
import matplotlib as mpl
import matplotlib.pyplot as plt
from scipy.special import erfinv
from scipy import linalg

# Project files.
from utilities.load_data import polynomial_data, linear_separable_data
from utilities import plot_helpers
from utilities.widgets import noise_widget, n_components_widget, min_prob_widget

# Widget and formatting modules
import IPython
import ipywidgets
from ipywidgets import interact, interactive, interact_manual, fixed
from matplotlib import rcParams
import matplotlib as mpl 
from scipy.stats import multivariate_normal, norm
# If in your browser the figures are not nicely vizualized, change the following line. 
rcParams['figure.figsize'] = (10, 5)
rcParams['font.size'] = 16

# Machine Learning library. 
from sklearn.mixture import GaussianMixture
from sklearn.cluster import KMeans
from sklearn import datasets

import warnings
warnings.filterwarnings("ignore")


# 1-D Gaussian Mixture Models

In [None]:
from ipywidgets import GridspecLayout, jslink
from collections import OrderedDict
grid = GridspecLayout(3, 3)
rcParams['figure.figsize'] = (20, 5)
rcParams['font.size'] = 16
X = np.linspace(-5, 5, 1000)

n_components = 3


# def GMMs(mean1, mean2, mean3, scale1, scale2, scale3, weight1, weight2, weight3):
# means, scales, weights = [mean1, mean2, mean3], [scale1, scale2, scale3], [weight1, weight2, weight3]
def GMM(mean1, mean2, mean3, scale1, scale2, scale3, weight1, weight2, weight3):
    means = [mean1, mean2, mean3]
    scales = [scale1, scale2, scale3]
    weights = [weight1, weight2, weight3]
    gaussians = [norm(mean, scale) for mean, scale in zip(means, scales)]
    scale = sum(weights)
    fig, axes = plt.subplots(1, 2)
    y = np.zeros_like(X)
    for i, (weight, gaussian) in enumerate(zip(weights, gaussians)):
        axes[0].plot(X, gaussian.pdf(X), label=f"{i}-th gaussian")
        y += weight * gaussian.pdf(X) / scale
    axes[0].legend()
    axes[0].set_title('Independent Gaussians')

    axes[1].plot(X, y)
    axes[1].set_title('Gaussian Mixture Model')
    plt.show()
# return fig 

means = OrderedDict()
scales = OrderedDict()
weights = OrderedDict()

means['mean1'] = ipywidgets.FloatSlider(value=-3, min=-4, max=4, step=0.01, continuous_update=False,
                                       description='Means')
means['mean2'] = ipywidgets.FloatSlider(value=0, min=-4, max=4, step=0.01, continuous_update=False)
means['mean3'] = ipywidgets.FloatSlider(value=2, min=-4, max=4, step=0.01, continuous_update=False)
scales['scale1'] = ipywidgets.FloatSlider(value=0.5, min=0.1, max=2, step=0.01, continuous_update=False,
                                         description='Scales')
scales['scale2'] = ipywidgets.FloatSlider(value=1, min=0.1, max=2, step=0.01, continuous_update=False)
scales['scale3'] = ipywidgets.FloatSlider(value=0.1, min=0.1, max=2, step=0.01, continuous_update=False)
weights['weight1'] = ipywidgets.FloatSlider(value=0.33, min=0, max=1, step=0.01, continuous_update=False,
                                            description='Weights')
weights['weight2'] = ipywidgets.FloatSlider(value=0.33, min=0, max=1, step=0.01, continuous_update=False)
weights['weight3'] = ipywidgets.FloatSlider(value=0.33, min=0, max=1, step=0.01, continuous_update=False)
                                           

meanw = ipywidgets.HBox(tuple(means.values()))
scalew = ipywidgets.HBox(tuple(scales.values()))
weightw = ipywidgets.HBox(tuple(weights.values()))

params = dict(means)
params.update(dict(scales))
params.update(dict(weights))
out = ipywidgets.interactive_output(GMM, params)

display(meanw, scalew, weightw, out)
# dict(means).update

# EM Algorithm

In [None]:
def weighted_average(data, weights):
    """Given data and weights calculate weighted average.
    
    Parameters
    ----------
    data: [num_points x dimension] or [num_points x dimension]
    weights:  [num_points x num_components], non-negative
    
    Returns
    -------
    weighted_average: [num_components x dimension] or [num_components x dimension x dimension]
    """
    
    assert np.all(weights >= 0)
    w = weights / weights.sum(axis=0)   
    return w.T @ data 
    

class EMGMM(object):
    def __init__(self, X, num_components, kind='soft', initialization='random', nu=1e-6):
        self.X = X 
        self.dim = X.shape[1]
        self.num_points = X.shape[0]
        self.num_components = num_components
        self.kind = kind 
        self.nu = nu 
        self.initialization = initialization
        
        self.restart()
        
    def init_means(self):
        if self.initialization == 'random':
            self.means = np.random.uniform(
                low=self.X.min(axis=0), high=(self.X.max(axis=0)), 
                size=(self.num_components, self.dim))
        elif self.initialization == 'random from data':
            idx = np.random.choice(self.num_points, self.num_components, replace=False)
            self.means = self.X[idx]
        elif self.initialization == 'kmeans++':
            self.means = np.empty((self.num_components, self.dim))
            i = np.random.choice(self.num_points, 1)
            self.means[0] = self.X[i]
            indexes = [i] 
            for j in range(1, self.num_components):
                probs = np.zeros(self.num_points)
            
                for i, x in enumerate(self.X):
                    d = np.linalg.norm(x - self.means[:j], 2, axis=1) ** 2 
                    probs[i] = np.min(d)
            
                probs = probs / np.sum(probs)
                i = np.random.choice(self.num_points, 1, p = probs)
                assert np.all(probs[np.array(indexes)] == 0)
                # All previous indexes should have zero probability of being selected. 
            
                self.means[j] = self.X[i] 
                indexes.append(i)
        
    def restart(self):        
        self.probabilities = np.ones((self.num_points, self.num_components)) / self.num_components
        self.init_means()
        self.covariances = np.array([np.diag(self.X.std(axis=0)) for _ in range(self.num_components)])
        self.weights = np.random.rand(self.num_components) 
        self.weights /= np.sum(self.weights)
    
    def expectation(self):
        gaussians = [multivariate_normal(mean, cov) for mean, cov in zip(self.means, self.covariances)] 
        
        for i, gaussian in enumerate(gaussians):
            self.probabilities[:, i] = gaussian.pdf(self.X)
        
        if self.kind == 'soft':
            self.probabilities /= self.probabilities.sum(axis=1)[:, np.newaxis]
        else:
            classes = np.argmax(self.probabilities, axis=1)
            aux = np.zeros_like(self.probabilities)
            aux[np.arange(self.num_points), classes] = 1
            self.probabilities = aux
        
    def maximization(self):
        "Fit clusters to weighted data points."
        self.weights = np.mean(self.probabilities, axis=0)
        
        avg_weights = (self.probabilities / self.probabilities.sum(axis=0)).T
        avg_weights = np.nan_to_num(avg_weights, 0)
        self.means = avg_weights @ self.X
        covariances = []
        for mean, weight in zip(self.means, avg_weights):
            delta = self.X - mean
            cov = delta.T @ np.diag(weight) @ delta + self.nu * np.eye(self.dim)
            covariances.append(cov)
        self.covariances = np.array(covariances)
            
    

In [None]:
def get_dataset(dataset, n_samples=200, noise=0.3):
    if dataset == 'linear':
        X, Y = linear_separable_data(n_samples, noise=noise, dim=2) 
        Y = (Y + 1) // 2
    elif dataset == '2-blobs':
        X, Y = datasets.make_classification(n_classes=2, n_features=2, n_informative=2, n_redundant=0,
                                            n_clusters_per_class=1, n_samples=n_samples, random_state=8)
    elif dataset == '3-blobs':
        X, Y = datasets.make_classification(n_classes=3, n_features=2, n_informative=2, n_redundant=0,
                                            n_clusters_per_class=1, n_samples=n_samples, random_state=8)
    elif dataset == '4-blobs':
        X, Y = datasets.make_classification(n_classes=4, n_features=2, n_informative=2, n_redundant=0,
                                            n_clusters_per_class=1, n_samples=n_samples, random_state=8) 
    elif dataset == 'high-dim':
        X, Y = datasets.make_classification(n_classes=3, n_features=10, n_informative=6, n_samples=n_samples,
                                            random_state=8)
        
    elif dataset == 'circles':
        X, Y = datasets.make_circles(noise=noise, n_samples=n_samples, factor=.5)
    elif dataset == 'moons':
        X, Y = datasets.make_moons(noise=noise, n_samples=n_samples)
    elif dataset == 'imbalanced':
        X, Y = linear_separable_data(n_samples, noise=noise, dim=2, num_negative=int(n_samples * 0.2))
        Y = (Y + 1) // 2
    elif dataset == 'varied variance':
        X, Y = datasets.make_blobs(n_samples=n_samples, cluster_std=[1.0, 2.5, 0.5], random_state=170)

    elif dataset == 'correlated':
        X, Y = datasets.make_classification(n_classes=2, n_features=2, n_informative=1, n_redundant=1,
                                            n_clusters_per_class=1, n_samples=n_samples, random_state=8)  
    elif dataset == 'iris':
        X, Y = datasets.load_iris(return_X_y=True)
    elif dataset == 'mnist':
        X, Y = datasets.load_digits(return_X_y=True)
    elif dataset == 'wine':
        X, Y = datasets.load_wine(return_X_y=True)
    return X, Y

In [None]:
def em(dataset, n_centers, kind, initialization, nu):
    X, _ = get_dataset(dataset, noise=0.1)
    X = X[:, :2]
    expectation_button = ipywidgets.Button(description="Expectation Step")
    maximization_button = ipywidgets.Button(description="Maximization Step")
    restart_button = ipywidgets.Button(description="Restart")

    cmap = plt.cm.viridis
    norm = mpl.colors.Normalize(vmin=0, vmax=n_centers - 1)


    algorithm = EMGMM(X, n_centers, kind=kind, initialization=initialization, nu=nu)

    def plot(display_expectation=True):
        fig, ax = plt.subplots(1, 1)

        colors = (algorithm.probabilities * np.arange(n_centers)).sum(axis=1)
        ax.scatter(X[:, 0], X[:, 1], c=cmap(norm(colors)), 
                   cmap=cmap)

        for i, (weight, mean, cov) in enumerate(zip(algorithm.weights, algorithm.means, algorithm.covariances)):
            plot_helpers.plot_ellipse(mean, cov, color=cmap(norm(i)), ax=ax)

        IPython.display.clear_output(wait=True)
        IPython.display.display(fig)
        plt.close()

        if display_expectation:
            display(expectation_button)    
        else:
            display(maximization_button)

        display(restart_button)

    def restart(b):
        algorithm.restart()
        plot(display_expectation=True)

    def expectation(b):
        algorithm.expectation()
        plot(display_expectation=False)

    def maximization(b):
        algorithm.maximization()
        plot(display_expectation=True)

    expectation_button.on_click(expectation)
    maximization_button.on_click(maximization)
    restart_button.on_click(restart)
    plot()

interact(em, dataset=['3-blobs', 'varied variance', 'linear', 'imbalanced', '2-blobs', '4-blobs', 'iris', 'circles', 'moons'], 
         n_centers=ipywidgets.IntSlider(value=3, min=1, max=10),
         kind=['hard', 'soft' ],
         initialization=['random', 'random from data', 'kmeans++'],
         nu=ipywidgets.BoundedFloatText(value=1e-6, min=0, max=1000, step=1e-6, 
                                        description='Nu:', continuous_update=False)
        );

# Covariance Models & Density Estimation

In [None]:
rcParams['figure.figsize'] = (16, 8)
rcParams['font.size'] = 16

def density_estimation(dataset, n_centers, covariance_type, noise):
    resample_button = ipywidgets.Button(description="Resample")
    X, _ = get_dataset(dataset, noise=noise)    
    X = X[:, :2]
    cmap = plt.cm.viridis
    norm = mpl.colors.Normalize(vmin=0, vmax=n_centers - 1)
        
    def resample(b):
        
        model = GaussianMixture(n_components=n_centers, covariance_type=covariance_type).fit(X)


        fig, axes = plt.subplots(1, 2)
        colors = (model.predict_proba(X) * np.arange(n_centers)).sum(axis=1)
        for ax in axes:
            ax.scatter(X[:, 0], X[:, 1], c=cmap(norm(colors)), cmap=cmap)

        weights, means = model.weights_, model.means_

        if covariance_type == 'full':
            covariances = model.covariances_
        elif covariance_type == 'tied':
            covariances = [model.covariances_ for _ in range(n_centers)]
        elif covariance_type == 'diag':
            covariances = [np.diag(cov) for cov in model.covariances_]
        elif covariance_type == 'spherical':
            covariances = [np.eye(X.shape[-1]) * cov for cov in model.covariances_]

        for i, (weight, mean, cov) in enumerate(zip(weights, means, covariances)):
            plot_helpers.plot_ellipse(mean, cov, color=cmap(norm(i)), ax=axes[0])

        xx, yy = plot_helpers.make_meshgrid(X[:, 0], X[:, 1], h=.02)
        xy = np.c_[xx.ravel(), yy.ravel()]
        P = -model.score_samples(xy)
        axes[1].contourf(xx, yy, P.reshape(xx.shape), cmap=cmap, alpha=0.5, 
                         norm=mpl.colors.LogNorm(vmin=1.0, vmax=1000.0), levels=np.logspace(0, 3, 10))

        axes[0].set_aspect('equal')
        axes[0].set_title('Gaussian Fit')
        axes[1].set_aspect('equal')
        axes[1].set_title('Density Estimation')
        
        IPython.display.clear_output(wait=True)
        IPython.display.display(fig)
        plt.close()

        display(resample_button)
    
    resample_button.on_click(resample)
    resample(None)

interact(density_estimation, 
         dataset=['3-blobs', 'varied variance', 'linear', 'imbalanced', '2-blobs', '4-blobs', 'iris', 'circles', 'moons'],  
         n_centers=ipywidgets.IntSlider(value=3, min=1, max=10),
         covariance_type=['full', 'tied', 'diag', 'spherical'],
         noise=ipywidgets.FloatSlider(value=0.2, min=0, max=1, step=0.01, description='Noise:', continuous_update=False)
        );

# GMMs Clustering

In [None]:
def clustering(algorithm, dataset, n_centers, noise):
    resample_button = ipywidgets.Button(description="Resample")

    X, _ = get_dataset(dataset, noise=noise)    
    X = X[:, :2]
    
    def resample(b):
        if algorithm == 'full-GMM':
            model = GaussianMixture(n_components=n_centers, covariance_type='full').fit(X)
        elif algorithm == 'tied-GMM':
            model = GaussianMixture(n_components=n_centers, covariance_type='tied').fit(X)
        elif algorithm == 'diag-GMM':
            model = GaussianMixture(n_components=n_centers, covariance_type='diag').fit(X)
        elif algorithm == 'spherical-GMM':
            model = GaussianMixture(n_components=n_centers, covariance_type='spherical').fit(X)
        elif algorithm == 'KMeans++':
            model = KMeans(n_clusters=n_centers, init='k-means++').fit(X)
        elif algorithm == 'KMeans':
            model = KMeans(n_clusters=n_centers, init='random').fit(X)

        plt.scatter(X[:, 0], X[:, 1], c=model.predict(X))

        IPython.display.clear_output(wait=True)
        IPython.display.display(plt.gcf())
        plt.close()

        display(resample_button)
    
    resample_button.on_click(resample)
    resample(None)
    

interact(clustering, 
         algorithm=['full-GMM', 'KMeans++', 'KMeans', 'tied-GMM', 'diag-GMM', 'spherical-GMM', ],
         dataset=['3-blobs', 'varied variance', 'linear', 'imbalanced', '2-blobs', '4-blobs', 'iris', 'circles', 'moons'],  
         n_centers=ipywidgets.IntSlider(value=3, min=1, max=10),
         noise=ipywidgets.FloatSlider(value=0.2, min=0, max=1, step=0.01, description='Noise:', continuous_update=False)
        );